Skip to main content

Example code for MQTT connection

This page will help you connect to MQTT by giving examples

In PHP

<?php
declare(strict_types=1);
echo "Hello World\n";
require "vendor/autoload.php";

use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\Examples\Shared\SimpleLogger;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\MqttClient;
use Psr\Log\LogLevel;

$host = 'mqtt.grouplinknetwork.com'; #env('MQTT_HOST'),
$port = 8883;
$clean_session = false;

$connectionSettings = (new ConnectionSettings)
->setConnectTimeout(10)
->setUseTls(true)
->setTlsSelfSignedAllowed(true)
->setKeepAliveInterval(60)
->setTlsClientCertificateFile('/home/usuario/mqttcrts/private.crt')
->setTlsCertificateAuthorityFile('/home/usuario/mqttcrts/grouplink-ca.crt')
->setTlsClientCertificateKeyFile('/home/usuario/mqttcrts/private.key');

// echo "ConnnectionSettings:";
// var_dump($connectionSettings);

$mqtt = new MqttClient($host, $port, 'acquax');
$mqtt->connect($connectionSettings, $clean_session);

// echo "MqttClient:";
// var_dump($mqtt);

$mqtt->registerLoopEventHandler(function ($mqtt, float $elapsedTime) {
if ($elapsedTime >= 590) {
$mqtt->interrupt();
}
});

$mqtt->subscribe('message/#', function ($topic, $message) {
var_dump($topic);
var_dump($message);
}, 1);

$mqtt->loop(true, false, 100);
?>

In C#

public async Task Connect_Client()
{
try
{
var ca = X509Certificate.CreateFromCertFile(@"grouplink-ca.crt");
var cert = new X509Certificate2(@"certificate-with-encrypted-key.pfx", "");

X509Chain ch = new X509Chain();
ch.ChainPolicy.ExtraStore.Add(ca);

Debug.WriteLine("If");
Debug.WriteLine(cert.Issuer);
Debug.WriteLine(cert.IssuerName);
Debug.WriteLine(cert.Subject);
Debug.WriteLine(cert.SubjectName);

var mqttFactory = new MqttFactory();

var mqttClient = mqttFactory.CreateMqttClient();

var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("mqtt.grouplinknetwork.com", 8883)
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate2> { cert },
CertificateValidationHandler = delegate { return true; }
})
.WithCleanSession()
.Build();

mqttClient.ApplicationMessageReceivedAsync += e =>
{
Debug.WriteLine("Received application message.");
Debug.WriteLine(e.ApplicationMessage.ConvertPayloadToString());
return Task.CompletedTask;
};

var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("message/gl-water-dash");
})
.Build();

var res = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
Debug.WriteLine("MQTT client subscribed to topic.");
}
catch (Exception ex)
{
Debug.WriteLine("Exception");
Debug.WriteLine(ex.Message);
}

finally
{
Debug.WriteLine("Finally");
}
}

In JS

const mqtt = require("mqtt");
const fs = require("fs");

const caFile = fs.readFileSync("grouplink-ca.crt");
const certFile = fs.readFileSync("private.crt");
const keyFile = fs.readFileSync("private.key");

const options = {
protocol: "mqtts",
host: "mqtt.grouplinknetwork.com",
port: 8883,
ca: [caFile],
cert: certFile,
key: keyFile,
rejectUnauthorized: true,
clean: false, // default is true = remove messages when disconnected
clientId: 'yourOrgName', // update here, set with your org name
};

const client = mqtt.connect(options);
const topic = "message/your-org-name"; // update here
console.log("connecting...");

client.on("connect", function () {
console.log("connected");
client.subscribe(topic, {
qos: 1, // at least once delivery
},function (err) {
if (!err) {
console.log("subscribed to " + topic);
}
});
});

client.on("disconnect", function () {
console.log("disconnected!");
});

client.on("offline", function () {
// in small disconnections, there is a chance that mqtt will
// reconnect and consume again, so it's just a log
console.log("offline!");
});

client.on("error", function (e) {
console.log("error: " + JSON.stringify(e));
// flush to the database, mark error status, it reached here,
// the lib will stop trying to reconnect
process.exit(1);
});

client.on("message", function (topic, message) {
// send it to your persistence queue/layer
console.log(message.toString());
});